package com.atguigu.day09;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class Flink07_SQL_KafaToKafka {
    public static void main(String[] args) {
        //1.获取流的执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        //2.创建表连接到kafka的Topic
        tableEnv.executeSql("create table sensor_source(" +
                "id String," +
                "ts bigint," +
                "vc int" +
                ")with(" +
                "'connector' = 'kafka'," +
                "'topic' = 'topic_source_sensor'," +
                "'properties.bootstrap.servers' = 'hadoop102:9092'," +
                "'properties.group.id' = '0718'," +
                "'scan.startup.mode' = 'latest-offset'," +
                "'format' = 'csv'" +
                ")");

        tableEnv.executeSql("create table sensor_sink(" +
                "id String," +
                "ts bigint," +
                "vc int" +
                ")with(" +
                "'connector' = 'kafka'," +
                "'topic' = 'topic_sink_sensor'," +
                "'properties.bootstrap.servers' = 'hadoop102:9092'," +
                "'format' = 'csv'" +
                ")");

        tableEnv.executeSql("insert into sensor_sink select * from sensor_source where id = 'sensor_1'");
    }
}
